package com.three.netty.rocketmq;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.three.api.connection.Connection;
import com.three.api.protocol.Packet;
import com.three.api.service.BaseService;
import com.three.api.service.Listener;
import com.three.common.receiver.ReceiverType;
import com.three.config.common.IConfig;
import com.three.utils.JsonUtils;
import com.three.utils.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * Created by Mathua on 2017/6/27.
 */
public class MQPushConsumerService extends BaseService {

    /**
     * 当前例子是PushConsumer用法，使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
     * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息，然后再回调用户Listener方法<br>
     */
    public static final MQPushConsumerService I = new MQPushConsumerService();
    private static final Logger LOGGER = LoggerFactory.getLogger(MQPushConsumerService.class);

    @Override
    protected void doStart(Listener listener) throws Throwable {
        /**
         * 一个应用创建一个Consumer，由应用来维护此对象，可以设置为全局对象或者单例<br>
         * 注意：ConsumerGroupName需要由应用来保证唯一
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(IConfig.chess.rocket_mq.consumer_group);
        consumer.setNamesrvAddr(IConfig.chess.rocket_mq.namesrv_addr);
        consumer.setInstanceName("Consumber");
        consumer.setVipChannelEnabled(false);

        /**
         * 订阅指定topic下tags分别等于TagA或TagC或TagD
         */
        consumer.subscribe(IConfig.chess.rocket_mq.topic, IConfig.chess.rocket_mq.tag);

        consumer.registerMessageListener(
                new MessageListenerConcurrently() {
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                        for(MessageExt msg : msgs) {
                            if(!msg.getTopic().equals(IConfig.chess.rocket_mq.topic))
                                continue;
                            if(msg.getTags() == null || !msg.getTags().equals(IConfig.chess.rocket_mq.tag))
                                continue;
                            MQKeys key = null;
                            if(msg.getKeys() != null) {
                                key = MQKeys.valueOfKey(msg.getKeys());
                                if(key != null && !key.check(msg.getKeys()))
                                    continue;
                            }
                            // 处理跨服消息
                            Packet packet = ObjectUtils.toT(msg.getBody());
                            Connection conn = null;
                            if(key != null)
                                conn = key.getConn(msg.getKeys());
                            LOGGER.error(JsonUtils.toJson(packet));
                            if(conn != null && packet != null) {
                                // 必要的，序列化的时候bytes数组不存在了
                                packet.parseFromBodyTmp();
                                ReceiverType.ROCKET_MQ_RECEIVER.getReceiver().onReceive(packet, conn);
                            }
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });

        /**
         * Consumer对象在使用之前必须要调用start初始化，初始化一次即可<br>
         */
        consumer.start();
        LOGGER.info("ConsumerStarted.");

        listener.onSuccess();
    }
}
